The framework comes with an import provider that uses the producer/consumer pattern to generate your CSEntryChanges and pass them back to the MIM sync engine.
This class abstracts away a lot of the boilerplate associated with generating CSEntryChange objects, and paging their results back to the MIM sync engine.
At a minimum, the following methods must be implemented.
CanImportAsync
This method tells the framework if this provider is capable of importing the specific object type.
GetObjectsAsync
This method is called at the start of the import operation. The method must return the models that represent the objects to be imported.
Typically this would be something like a 'get all users' call from the API.
Yield new objects as soon as they are available, as the consumer thread will start picking them up and converting them to CSEntryChanges.
GetAnchorAttributesAsync
Once an object has been picked up by the consumer thread, this method will be called to obtain the anchor attributes. Using the model passed into the method, create the relevant AnchorAttribute values and return them. They will be added to the resulting CSEntryChange
GetDNAsync
The consumer thread will call out to this method and request the DN that should be used for the given model object.
GetObjectModificationTypeAsync
The consumer thread will call out to this method to request the type of change to the object that is being processed. When in a full import operation, this will always be Add. In a delta import operation, determine the type of change that was made, and return the appopriate value.
CreateAttributeChangeAsync
For each attribute in the schema, the consumer will call this method and ask for relevant AttributeChange objects for the schema attribute in question.
If there are no changes to the specified attribute, simply return null.
GetOutboundWatermark
Once the producer thread has completed, this method is called to obtain the watermark value. This value will be provided on the next import to the GetObjectsAsync method. This allows a delta import to resume from the last known location.
If the management agent doesn't support delta imports, then simply return null.
usingSystem.Collections.Generic;usingSystem.Net.Http;usingSystem.Runtime.CompilerServices;usingSystem.Text.Json;usingSystem.Threading;usingSystem.Threading.Tasks;usingMicrosoft.Extensions.Logging;usingMicrosoft.MetadirectoryServices;namespaceLithnet.Ecma2Framework.Example{internalclassUserImportProvider:ProducerConsumerImportProvider<User> {privatereadonlyHttpClient client;privatereadonlyILogger<UserImportProvider> logger;publicUserImportProvider(HttpClient client,ILogger<UserImportProvider> logger) : base(logger) {this.client= client;this.logger= logger; } /// <summary> /// Determines if this provider can import objects of the specified type /// </summary> /// <paramname="type">The type of object to import</param>publicoverrideTask<bool> CanImportAsync(SchemaType type) {returnTask.FromResult(type.Name=="user"); } /// <summary> /// Gets an enumerable of objects to import /// </summary> /// <paramname="watermark">The incoming watermark for this object type</param> /// <paramname="cancellationToken">A cancellation token</param> /// <returns>An enumerable of objects to import</returns> protected override async IAsyncEnumerable<User> GetObjectsAsync(string watermark, [EnumeratorCancellation] CancellationToken cancellationToken)
{var result =awaitthis.client.GetAsync(this.client.BaseAddress+"/users"); result.EnsureSuccessStatusCode();var usersData =awaitresult.Content.ReadAsStringAsync();var users =JsonSerializer.Deserialize<List<User>>(usersData); this.logger.LogInformation("Retrieved {count} users", users.Count);foreach (User user in users) {yieldreturn user; } } /// <summary> /// This method gets the anchor attributes for the specified object /// </summary> /// <param name="item">The object returned from <see cref="GetObjectsAsync(string, CancellationToken)"/> which the anchor attributes are needed for</param>
/// <returns>A list of anchor attributes</returns>protectedoverride Task<List<AnchorAttribute>> GetAnchorAttributesAsync(User item) { List<AnchorAttribute> anchors = new List<AnchorAttribute>(); anchors.Add(AnchorAttribute.Create("id", item.Id)); return Task.FromResult(anchors); } /// <summary> /// This method builds the DN for the specified object /// </summary> /// <param name="item">The object returned from <see cref="GetObjectsAsync(string, CancellationToken)"/> which the DN is needed for</param>
/// <returns></returns>protectedoverrideTask<string> GetDNAsync(User item) {returnTask.FromResult(item.Id); } /// <summary> /// This method gets the modification type for the specified object. When performing a full import, this should always be <see cref="ObjectModificationType.Add"/>
/// </summary> /// <param name="item">The object returned from <see cref="GetObjectsAsync(string, CancellationToken)"/> which the modification type is needed for</param>
/// <returns>The type of modification to report for this object</returns>protectedoverrideTask<ObjectModificationType> GetObjectModificationTypeAsync(User item) {returnTask.FromResult(ObjectModificationType.Add); } /// <summary> /// Creates an attribute change for the specified schema attribute /// </summary> /// <paramname="type">The schema attribute to create the change for</param> /// <paramname="modificationType">The type of modification taking place on this object</param> /// <param name="item">The object returned from <see cref="GetObjectsAsync(string, CancellationToken)"/> which the attribute change is needed for</param>
/// <paramname="cancellationToken">A cancellation token</param> /// <returns>An attribute change, or null if there are no changes to report for the given attribute</returns> protected override Task<AttributeChange> CreateAttributeChangeAsync(SchemaAttribute type, ObjectModificationType modificationType, User item, CancellationToken cancellationToken)
{switch (type.Name) {case"name":returnTask.FromResult(AttributeChange.CreateAttributeAdd(type.Name,item.Name));case"email":returnTask.FromResult(AttributeChange.CreateAttributeAdd(type.Name,item.Email));case"phone":returnTask.FromResult(AttributeChange.CreateAttributeAdd(type.Name,item.Phone)); }returnTask.FromResult<AttributeChange>(null); } /// <summary> /// Optionally provides an outbound watermark value for the specified object type /// </summary> /// <paramname="type">The object type to get the watermark for</param> /// <paramname="cancellationToken">A cancellation token</param> /// <returns>An outbound watermark, or null if the management agent doesn't support delta operations</returns>publicoverrideTask<string> GetOutboundWatermark(SchemaType type,CancellationToken cancellationToken) {returnTask.FromResult<string>(null); } }}